内存存储 (MemoryCheckpointStore) #

目录 #

  1. 简介
  2. 项目结构
  3. 核心组件
  4. 架构概览
  5. 详细组件分析
  6. 依赖关系分析
  7. 性能考虑
  8. 故障排除指南
  9. 结论

简介 #

MemoryCheckpointStore 是 LangGraphGo 框架中的一个内存检查点存储实现,专门设计用于开发测试和无持久化需求的场景。它使用并发安全的内存映射(sync.Map)来临时存储检查点数据,提供了极低的读写延迟,但需要注意的是,进程终止后所有数据都会丢失。

该存储实现遵循统一的 CheckpointStore 接口,确保与其他存储实现(如 PostgreSQL、Redis、SQLite)的一致性设计,便于在不同环境中切换存储后端。

项目结构 #

MemoryCheckpointStore 的实现位于 graph/checkpointing.go 文件中,与整个检查点系统的核心功能紧密集成:

graph TB
subgraph "检查点存储层次结构"
A[CheckpointStore接口] --> B[MemoryCheckpointStore]
A --> C[FileCheckpointStore]
A --> D[PostgresCheckpointStore]
A --> E[RedisCheckpointStore]
A --> F[SqliteCheckpointStore]
end
subgraph "使用层"
G[CheckpointableRunnable] --> A
H[CheckpointableMessageGraph] --> A
end
subgraph "配置层"
I[CheckpointConfig] --> A
end

图表来源

章节来源

核心组件 #

MemoryCheckpointStore 的核心组件包括:

数据结构 #

并发控制 #

接口实现 #

章节来源

架构概览 #

MemoryCheckpointStore 采用简洁的内存存储架构,通过 Go 的内置 map 和 sync 包实现并发安全的数据存储:

classDiagram
class MemoryCheckpointStore {
+map[string]*Checkpoint checkpoints
+sync.RWMutex mutex
+NewMemoryCheckpointStore() *MemoryCheckpointStore
+Save(ctx, checkpoint) error
+Load(ctx, checkpointID) *Checkpoint, error
+List(ctx, executionID) []*Checkpoint, error
+Delete(ctx, checkpointID) error
+Clear(ctx, executionID) error
}
class Checkpoint {
+string ID
+string NodeName
+interface State
+map[string]interface Metadata
+time.Time Timestamp
+int Version
}
class CheckpointStore {
<<interface>>
+Save(ctx, checkpoint) error
+Load(ctx, checkpointID) *Checkpoint, error
+List(ctx, executionID) []*Checkpoint, error
+Delete(ctx, checkpointID) error
+Clear(ctx, executionID) error
}
MemoryCheckpointStore ..|> CheckpointStore
MemoryCheckpointStore --> Checkpoint : stores

图表来源

详细组件分析 #

初始化和创建 #

MemoryCheckpointStore 通过工厂函数 NewMemoryCheckpointStore() 创建实例:

sequenceDiagram
participant Client as 客户端代码
participant Factory as NewMemoryCheckpointStore
participant Store as MemoryCheckpointStore
participant Map as 内存映射
Client->>Factory : 调用 NewMemoryCheckpointStore()
Factory->>Store : 创建新实例
Factory->>Map : make(map[string]*Checkpoint)
Store-->>Client : 返回初始化的存储实例

图表来源

Save 操作实现 #

Save 操作负责将检查点保存到内存存储中:

flowchart TD
Start([Save 开始]) --> Lock["获取写锁<br/>mutex.Lock()"]
Lock --> Store["存储检查点<br/>checkpoints[checkpoint.ID] = checkpoint"]
Store --> Unlock["释放写锁<br/>defer mutex.Unlock()"]
Unlock --> Return["返回 nil 错误"]
Return --> End([Save 结束])

图表来源

Load 操作实现 #

Load 操作从内存中检索指定 ID 的检查点:

flowchart TD
Start([Load 开始]) --> RLock["获取读锁<br/>mutex.RLock()"]
RLock --> Lookup["查找检查点<br/>checkpoints[checkpointID]"]
Lookup --> Exists{"检查点存在?"}
Exists --> |是| Return["返回检查点"]
Exists --> |否| NotFound["返回 'checkpoint not found' 错误"]
Return --> RUnlock["释放读锁<br/>defer mutex.RUnlock()"]
NotFound --> RUnlock
RUnlock --> End([Load 结束])

图表来源

List 操作实现 #

List 操作返回特定执行 ID 的所有检查点:

flowchart TD
Start([List 开始]) --> RLock["获取读锁<br/>mutex.RLock()"]
RLock --> Iterate["遍历所有检查点"]
Iterate --> CheckExec["检查执行 ID 匹配"]
CheckExec --> Match{"匹配执行 ID?"}
Match --> |是| Add["添加到结果列表"]
Match --> |否| Next["下一个检查点"]
Add --> Next
Next --> More{"还有更多检查点?"}
More --> |是| CheckExec
More --> |否| Return["返回检查点列表"]
Return --> RUnlock["释放读锁<br/>defer mutex.RUnlock()"]
RUnlock --> End([List 结束])

图表来源

Delete 操作实现 #

Delete 操作移除指定 ID 的检查点:

flowchart TD
Start([Delete 开始]) --> Lock["获取写锁<br/>mutex.Lock()"]
Lock --> Remove["删除检查点<br/>delete(checkpoints, checkpointID)"]
Remove --> Unlock["释放写锁<br/>defer mutex.Unlock()"]
Unlock --> Return["返回 nil 错误"]
Return --> End([Delete 结束])

图表来源

Clear 操作实现 #

Clear 操作移除特定执行 ID 的所有检查点:

flowchart TD
Start([Clear 开始]) --> Lock["获取写锁<br/>mutex.Lock()"]
Lock --> Iterate["遍历所有检查点"]
Iterate --> CheckExec["检查执行 ID 匹配"]
CheckExec --> Match{"匹配执行 ID?"}
Match --> |是| Remove["删除检查点<br/>delete(checkpoints, id)"]
Match --> |否| Next["下一个检查点"]
Remove --> Next
Next --> More{"还有更多检查点?"}
More --> |是| CheckExec
More --> |否| Unlock["释放写锁<br/>defer mutex.Unlock()"]
Unlock --> Return["返回 nil 错误"]
Return --> End([Clear 结束])

图表来源

章节来源

依赖关系分析 #

MemoryCheckpointStore 与其他组件的依赖关系:

graph TD
subgraph "外部依赖"
A[sync 包] --> B[互斥锁机制]
C[context 包] --> D[上下文支持]
E[fmt 包] --> F[错误格式化]
end
subgraph "内部依赖"
G[Checkpoint 结构] --> H[数据模型]
I[CheckpointStore 接口] --> J[统一接口]
end
subgraph "使用组件"
K[CheckpointableRunnable] --> L[自动检查点]
M[CheckpointableMessageGraph] --> N[图形检查点]
O[CheckpointConfig] --> P[配置管理]
end
A --> B
C --> D
E --> F
G --> H
I --> J
K --> L
M --> N
O --> P

图表来源

章节来源

性能考虑 #

优势特性 #

  1. 极低延迟:由于数据完全存储在内存中,读写操作几乎无延迟
  2. 高并发支持:使用读写锁实现高效的并发访问控制
  3. 简单高效:无需外部依赖,部署简单

性能特征 #

操作类型 延迟特征 并发支持 内存使用
Save O(1) 常数时间 写锁保护 线性增长
Load O(1) 常数时间 读锁保护 固定开销
List O(n) 线性时间 读锁保护 线性增长
Delete O(1) 常数时间 写锁保护 线性减少
Clear O(n) 线性时间 写锁保护 线性减少

限制因素 #

  1. 数据持久性:进程重启后数据完全丢失
  2. 内存限制:受系统内存容量限制
  3. 单机部署:无法在多节点环境中共享状态

故障排除指南 #

常见问题及解决方案 #

1. 检查点未找到错误 #

症状:调用 Load 或其他操作时返回 “checkpoint not found” 错误 原因:检查点可能已被删除或不存在 解决方案

2. 内存泄漏问题 #

症状:长时间运行后内存使用持续增长 原因:检查点数量过多且未及时清理 解决方案

3. 并发访问冲突 #

症状:出现竞态条件或数据不一致 原因:多个 goroutine 同时访问存储 解决方案

章节来源

最佳实践 #

单元测试中的使用 #

MemoryCheckpointStore 在单元测试中表现出色:

sequenceDiagram
participant Test as 测试函数
participant Store as MemoryCheckpointStore
participant Runnable as CheckpointableRunnable
Test->>Store : 创建内存存储
Test->>Runnable : 配置自动检查点
Test->>Runnable : 执行工作流
Runnable->>Store : 自动保存检查点
Test->>Runnable : 查询检查点
Runnable->>Store : 获取检查点列表
Store-->>Test : 返回检查点数据
Test->>Store : 清理测试数据

图表来源

生产环境注意事项 #

虽然 MemoryCheckpointStore 不适用于生产环境,但在以下场景中可以作为开发和测试工具:

  1. 快速原型开发:验证检查点逻辑而不依赖外部存储
  2. 单元测试:提供快速、可靠的测试环境
  3. 本地开发:简化开发流程,无需配置数据库
  4. 性能测试:评估应用性能而不受存储瓶颈影响

迁移到生产环境 #

当需要迁移到生产环境时,建议的存储选择:

  1. PostgreSQL:适合需要强一致性和复杂查询的应用
  2. Redis:适合需要高性能和缓存特性的应用
  3. SQLite:适合轻量级和嵌入式应用
  4. 文件存储:适合简单的文件系统部署

章节来源

结论 #

MemoryCheckpointStore 是 LangGraphGo 框架中一个精心设计的内存存储实现,具有以下特点:

主要优势 #

适用场景 #

限制和替代方案 #

由于缺乏持久化能力,MemoryCheckpointStore 不适用于生产环境。对于生产部署,应考虑使用 PostgreSQL、Redis 或 SQLite 等持久化存储解决方案。

通过理解 MemoryCheckpointStore 的实现原理和使用场景,开发者可以更好地利用 LangGraphGo 框架的检查点功能,在不同的开发阶段选择合适的存储策略。